/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.connector.jdbc.catalog;

import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;

import org.apache.commons.compress.utils.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** Abstract catalog for any JDBC catalogs. */
public abstract class AbstractJdbcCatalog extends AbstractCatalog {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);

    protected final ClassLoader userClassLoader;
    protected final String username;
    protected final String pwd;
    protected final String baseUrl;
    protected final String defaultUrl;

    public AbstractJdbcCatalog(
            ClassLoader userClassLoader,
            String catalogName,
            String defaultDatabase,
            String username,
            String pwd,
            String baseUrl) {
        
        super(catalogName, defaultDatabase);

        checkNotNull(userClassLoader);
        checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
        checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
        checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));

        JdbcCatalogUtils.validateJdbcUrl(baseUrl);

        this.userClassLoader = userClassLoader;
        this.username = username;
        this.pwd = pwd;
        this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
        this.defaultUrl = this.baseUrl + defaultDatabase;
    }

    @Override
    public void open() throws CatalogException {
        // load the Driver use userClassLoader explicitly, 
        // see FLINK-15635 for more detail
        try (TemporaryClassLoaderContext ignored =
                TemporaryClassLoaderContext.of(userClassLoader)) {
            // test connection, fail early if we cannot connect to database
            try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
            } catch (SQLException e) {
                throw new ValidationException(
                        String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
            }
            LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
        }
    }

    @Override
    public void close() throws CatalogException {
        LOG.info("Catalog {} closing", getName());
    }

    // ----- getters ------

    public String getUsername() {
        return username;
    }

    public String getPassword() {
        return pwd;
    }

    public String getBaseUrl() {
        return baseUrl;
    }

    // ------ retrieve PK constraint ------
    protected Optional<UniqueConstraint> getPrimaryKey(
            DatabaseMetaData metaData, String database, String schema, String table)
            throws SQLException {
        
        // rs返回值：com.mysql.cj.jdbc.result.ResultSetImpl@687a762c
        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);

        Map<Integer, String> keySeqColumnName = new HashMap<>();
        
        String pkName = null;
        while (rs.next()) {
            //字段
            String columnName = rs.getString("COLUMN_NAME");
            
            // PRIMARY
            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
            int keySeq = rs.getInt("KEY_SEQ");
            
            //!keySeqColumnName.containsKey(keySeq - 1)这段代码用于判断 keySeqColumnName 集合中
            // 是否存在索引为 keySeq - 1 的元素。
            // 如果不存在，则返回 true，否则返回 false。
            Preconditions.checkState(
                    !keySeqColumnName.containsKey(keySeq - 1),
                    "The field(s) of primary key must be from the same table.");
            // 这行代码将键值对 (keySeq - 1, columnName) 添加到 keySeqColumnName集合中。
            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
        }
        
        List<String> pkFields =
                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
        
        keySeqColumnName.forEach(pkFields::set);
        
        if (!pkFields.isEmpty()) {
            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
        }
        
        return Optional.empty();
    }

    // ------ table factory ------

    @Override
    public Optional<Factory> getFactory() {
        return Optional.of(new JdbcDynamicTableFactory());
    }

    // ------ databases ------

    @Override
    public boolean databaseExists(String databaseName) throws CatalogException {
        checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));

        return listDatabases().contains(databaseName);
    }

    @Override
    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
            throws DatabaseAlreadyExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
            throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
            throws DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public CatalogDatabase getDatabase(String databaseName)
            throws DatabaseNotExistException, CatalogException {

        Preconditions.checkState(
                !StringUtils.isNullOrWhitespaceOnly(databaseName),
                "Database name must not be blank.");
        if (listDatabases().contains(databaseName)) {
            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
        } else {
            throw new DatabaseNotExistException(getName(), databaseName);
        }
    }

    // ------ tables and views ------

    @Override
    public CatalogBaseTable getTable(ObjectPath tablePath)
            throws TableNotExistException, CatalogException {

        if (!tableExists(tablePath)) {
            throw new TableNotExistException(getName(), tablePath);
        }

        String databaseName = tablePath.getDatabaseName();
        String dbUrl = baseUrl + databaseName;

        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
            DatabaseMetaData metaData = conn.getMetaData();
            // primaryKey返回结果：
            // CONSTRAINT `PRIMARY` PRIMARY KEY (`id`) NOT ENFORCED
            Optional<UniqueConstraint> primaryKey =
                    getPrimaryKey(
                            metaData,
                            databaseName,
                            getSchemaName(tablePath),
                            getTableName(tablePath));

            PreparedStatement ps =
                    conn.prepareStatement(
                            String.format("SELECT * FROM %s;", getSchemaTableName(tablePath)));
            
            // resultSetMetaData返回结果
            //com.mysql.cj.jdbc.result.ResultSetMetaData@5400db36 - 
            // Field level information:
            //	com.mysql.cj.result.Field@76b74e9c
            //	[dbName=testdb,tableName=books,originalTableName=books,columnName=id,originalColumnName=id,mysqlType=3(FIELD_TYPE_INT),sqlType=4,flags= PRIMARY_KEY, charsetIndex=63, charsetName=ISO-8859-1]
            
            //	com.mysql.cj.result.Field@2d72f75e
            //	[dbName=testdb,tableName=books,originalTableName=books,columnName=title,originalColumnName=title,mysqlType=253(FIELD_TYPE_VARCHAR),sqlType=12,flags=, charsetIndex=45, charsetName=UTF-8]
            
            //	com.mysql.cj.result.Field@8ab78bc
            //	[dbName=testdb,tableName=books,originalTableName=books,columnName=authors,originalColumnName=authors,mysqlType=253(FIELD_TYPE_VARCHAR),sqlType=12,flags=, charsetIndex=45, charsetName=UTF-8]
            
            //	com.mysql.cj.result.Field@5aa0dbf4
            //	[dbName=testdb,tableName=books,originalTableName=books,columnName=year,originalColumnName=year,mysqlType=3(FIELD_TYPE_INT),sqlType=4,flags=, charsetIndex=63, charsetName=ISO-8859-1]
            ResultSetMetaData resultSetMetaData = ps.getMetaData();

            // 创建了一个长度为 resultSetMetaData.getColumnCount()的字符串数组 columnNames
            // [null, null, null, null]
            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
            
            // DataType[4]@3455
            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];

            //1、使用 for循环，从 1到 resultSetMetaData.getColumnCount()遍历结果集的列。
            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
                //2、在循环中，将当前列的名称通过 resultSetMetaData.getColumnName(i)方法获取，
                //   并存储到 columnNames数组中，索引为 i - 1。
                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
                //3、调用 fromJDBCType(tablePath, resultSetMetaData, i)方法获取当前列的类型，
                //   并存储到 types 数组中，索引为 i - 1。
                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
                
                //3、使用 resultSetMetaData.isNullable(i)方法判断当前列是否可为空，
                //   如果不可为空（返回值为 ResultSetMetaData.columnNoNulls），
                //   则通过 types[i - 1].notNull()方法将该列的类型设置为非空类型。
                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
                    //4、循环结束后，columnNames 和 types 数组中存储了结果集中每列的名称和类型信息。
                    types[i - 1] = types[i - 1].notNull();
                }
            }
            //columnNames：[id,title,authors,year]
            //types：[1,2,3,4]
            //Schema.newBuilder()：这是一个静态工厂方法，用于创建一个新的Schema.Builder实例。
            //.fromFields(columnNames, types)：这是一个Builder方法，用于从给定的列名和数据类型列表构建Schema。
            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
            
            //pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns())：
            // 这是一个 Lambda表达式，接收一个参数pk（即 Optional中的值），
            // 然后调用 schemaBuilder的primaryKeyNamed方法。
            // pk.getName()和pk.getColumns()分别获取主键的名字和对应的列。
            
            //primaryKey.ifPresent(...)：这是 Optional类的 ifPresent方法，
            // 它会在 Optional中包含值的情况下执行给定的 Lambda表达式。
            primaryKey.ifPresent(pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
            
            //使用schemaBuilder的build方法来创建一个新的Schema实例，并将这个新的实例赋值给变量tableSchema。
            Schema tableSchema = schemaBuilder.build();

            Map<String, String> props = new HashMap<>();
            props.put(CONNECTOR.key(), IDENTIFIER); // 键：CONNECTOR.key()的返回值 connect，值：jdbc。
            props.put(URL.key(), dbUrl);            // 键：url，值：jdbc:mysql://localhost:3306/test。
            props.put(USERNAME.key(), username);    // 键：username，值：root
            props.put(PASSWORD.key(), pwd);         // 键：password，值：root
            props.put(TABLE_NAME.key(), getSchemaTableName(tablePath));  // 键：table-name，值：表名

            // CatalogTable：通常用于描述一个表的元数据，包括其结构（由tableSchema定义）、分区、属性等。
            // tableSchema：这是 CatalogTable的第一个参数，表示表的模式或结构。
            // null：这是 CatalogTable的第二个参数。在这个例子中，它可能代表表的分区信息。
            //       因为这里传入的是null，所以这个表没有分区。
            // Lists.newArrayList()：这是 CatalogTable的第三个参数，可能代表表的其他特性或属性。
            //                       因为这里传入的是一个空列表，所以这个表没有其他特性或属性。
            // props：这是 CatalogTable的最后一个参数，表示表的属性。
            //        这些属性可以包括连接器信息、源信息、目标信息等。
            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
            
        } catch (Exception e) {
            throw new CatalogException(
                    String.format("Failed getting table %s", tablePath.getFullName()), e);
        }
    }

    @Override
    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
            throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
            throws TableNotExistException, TableAlreadyExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void alterTable(
            ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
            throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public List<String> listViews(String databaseName)
            throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }

    // ------ partitions ------

    @Override
    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
            throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    @Override
    public List<CatalogPartitionSpec> listPartitions(
            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
            throws TableNotExistException, TableNotPartitionedException,
                    PartitionSpecInvalidException, CatalogException {
        return Collections.emptyList();
    }

    @Override
    public List<CatalogPartitionSpec> listPartitionsByFilter(
            ObjectPath tablePath, List<Expression> filters)
            throws TableNotExistException, TableNotPartitionedException, CatalogException {
        
        return Collections.emptyList();
    }

    @Override
    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
            throws PartitionNotExistException, CatalogException {
        throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
    }

    @Override
    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
            throws CatalogException {
        return false;
    }

    @Override
    public void createPartition(
            ObjectPath tablePath,
            CatalogPartitionSpec partitionSpec,
            CatalogPartition partition,
            boolean ignoreIfExists)
            throws TableNotExistException, TableNotPartitionedException,
                    PartitionSpecInvalidException, PartitionAlreadyExistsException,
                    CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void dropPartition(
            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
            throws PartitionNotExistException, CatalogException {
        
        throw new UnsupportedOperationException();
    }

    @Override
    public void alterPartition(
            ObjectPath tablePath,
            CatalogPartitionSpec partitionSpec,
            CatalogPartition newPartition,
            boolean ignoreIfNotExists)
            throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    // ------ functions ------
    @Override
    public List<String> listFunctions(String dbName)
            throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }

    @Override
    public CatalogFunction getFunction(ObjectPath functionPath)
            throws FunctionNotExistException, CatalogException {
        throw new FunctionNotExistException(getName(), functionPath);
    }

    @Override
    public boolean functionExists(ObjectPath functionPath) throws CatalogException {
        return false;
    }

    @Override
    public void createFunction(
            ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
            throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void alterFunction(
            ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
            throws FunctionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
            throws FunctionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    // ------ stats ------
    @Override
    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
            throws TableNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    @Override
    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
            throws TableNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    @Override
    public CatalogTableStatistics getPartitionStatistics(
            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
            throws PartitionNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    @Override
    public CatalogColumnStatistics getPartitionColumnStatistics(
            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
            throws PartitionNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    @Override
    public void alterTableStatistics(
            ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
            throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }
    
    @Override
    public void alterTableColumnStatistics(
            ObjectPath tablePath,
            CatalogColumnStatistics columnStatistics,
            boolean ignoreIfNotExists)
            throws TableNotExistException, CatalogException, TablePartitionedException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void alterPartitionStatistics(
            ObjectPath tablePath,
            CatalogPartitionSpec partitionSpec,
            CatalogTableStatistics partitionStatistics,
            boolean ignoreIfNotExists)
            throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void alterPartitionColumnStatistics(
            ObjectPath tablePath,
            CatalogPartitionSpec partitionSpec,
            CatalogColumnStatistics columnStatistics,
            boolean ignoreIfNotExists)
            throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    protected List<String> extractColumnValuesBySQL(
            String connUrl,
            String sql,
            int columnIndex,
            Predicate<String> filterFunc,
            Object... params) {

        List<String> columnValues = Lists.newArrayList();

        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
                PreparedStatement ps = conn.prepareStatement(sql)) {
            if (Objects.nonNull(params) && params.length > 0) {
                for (int i = 0; i < params.length; i++) {
                    ps.setObject(i + 1, params[i]);
                }
            }

            ResultSet rs = ps.executeQuery();
            while (rs.next()) {
                String columnValue = rs.getString(columnIndex);
                if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
                    columnValues.add(columnValue);
                }
            }
            return columnValues;
        } catch (Exception e) {
            throw new CatalogException(
                    String.format(
                            "The following SQL query could not be executed (%s): %s", connUrl, sql),
                    e);
        }
    }

    protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
            throws SQLException {
        throw new UnsupportedOperationException();
    }

    protected String getTableName(ObjectPath tablePath) {
        throw new UnsupportedOperationException();
    }

    protected String getSchemaName(ObjectPath tablePath) {
        throw new UnsupportedOperationException();
    }

    protected String getSchemaTableName(ObjectPath tablePath) {
        throw new UnsupportedOperationException();
    }
}
